Writing Arbiters (pt 2 - Off-chain Oracles)
Off-chain oracles are arbitration services that run outside the blockchain but submit their decisions on-chain via the TrustedOracleArbiter contract. Unlike on-chain arbiters (see pt 1) that validate within smart contract execution, oracles perform arbitration in an external environment where they can access APIs, run complex computations, maintain state databases, or integrate with external systems.
This guide covers implementing oracle services using the Alkahest SDKs. The patterns here apply to building production oracle infrastructure that can validate work submissions against arbitrary criteria.
Understanding Your Role as an Oracle
Example scenario: Alice wants Bob to capitalize a string ("hello world" → "HELLO WORLD"). She creates an escrow offering payment, specifying you (Charlie) as the trusted oracle who will verify if Bob's work is correct. Bob submits his result, requests your arbitration, and if you approve, he can claim the payment.
Your oracle service needs to:
- Listen for arbitration requests that specify you as the oracle
- Extract the fulfillment data and original demand from attestations
- Validate the fulfillment according to the demand criteria
- Submit your approval or rejection decision on-chain
The TrustedOracleArbiter contract handles the on-chain logic - your job is to implement the validation logic and submit decisions.
For a complete example of how oracles fit into the escrow/fulfillment flow, see "Escrow Flow (pt 2 - Job Trading)".
Three Validation Patterns
| Pattern | Returns | State | Escrow Access | Use Case |
|---|---|---|---|---|
| Contextless | bool | Oracle maintains state | No | Signature verification, identity validation, format checking |
| Demand-Based | bool | Stateless | Yes - reads demand | Custom validation per escrow, test case validation |
| Asynchronous | null | Job queue | Yes - reads demand | Time-based monitoring, long-running computations |
Decision flowchart:
- Does validation require waiting over time?
- Yes → Asynchronous
- No → Does validation need the escrow's demand parameters?
- Yes → Demand-Based
- No → Contextless
Pattern 1: Contextless Validation
Contextless oracles validate fulfillments based purely on the fulfillment's intrinsic properties and the oracle's own maintained state, without referencing the original escrow demand. This pattern is useful for building reusable validation services that work across any escrow.
When to use: Signature verification, format checking, identity validation, standard verification against a maintained registry.
Why contextless: The oracle provides a generic service (e.g., "I verify signatures from known identities") rather than validating against buyer-specific criteria. The validation logic doesn't depend on what Alice requested - only on what Bob submitted.
Composability: Because contextless oracles are generic and reusable, they can be easily composed with other arbiters using logical combinators like AllArbiter and AnyArbiter. For example, you could require that a fulfillment is both signed by a verified identity (contextless oracle) AND meets specific job criteria (demand-based oracle). See "Escrow Flow (pt 3 - Composing Demands)" for composition patterns.
Reference implementations:
- Python:
test_identity.py - TypeScript:
offchain-oracle-identity.test.ts - Rust:
offchain_oracle_identity.rs
Step 1: Define fulfillment format and registry state
Define what fulfillments look like and what state you maintain:
- Python
- TypeScript
- Rust
import json
from dataclasses import dataclass
from typing import Dict
from eth_account import Account
from eth_account.messages import encode_defunct
@dataclass
class IdentityFulfillment:
pubkey: str
nonce: int
data: str
signature: str
# Oracle's internal registry (identity address -> current nonce)
# This represents the oracle's concept of which identities are valid
# and tracks nonces to prevent replay attacks
identity_registry: Dict[str, int] = {}
import { parseAbiParameters, recoverMessageAddress } from "viem";
// Fulfillment format (what sellers submit)
type IdentityFulfillment = {
pubkey: `0x${string}`;
nonce: number;
data: string;
signature: `0x${string}`;
};
// Oracle's internal registry (identity address -> current nonce)
// This represents the oracle's concept of which identities are valid
// and tracks nonces to prevent replay attacks
const identityRegistry = new Map<`0x${string}`, number>();
// ABI for decoding StringObligation data
const stringObligationAbi = parseAbiParameters("(string item)");
use std::{collections::HashMap, sync::OnceLock};
use alloy::primitives::{Address, Signature, keccak256};
use tokio::sync::Mutex;
// Fulfillment format (what sellers submit)
#[derive(Debug, Clone, Serialize, Deserialize)]
struct IdentityFulfillment {
pubkey: Address,
nonce: u64,
data: String,
signature: Vec<u8>,
}
// Oracle's internal registry (identity address -> current nonce)
// This represents the oracle's concept of which identities are valid
// and tracks nonces to prevent replay attacks
static IDENTITY_REGISTRY: OnceLock<Mutex<HashMap<Address, u64>>> = OnceLock::new();
fn identity_registry() -> &'static Mutex<HashMap<Address, u64>> {
IDENTITY_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
Step 2: Initialize your registry
Before starting the listener, populate your oracle's state:
- Python
- TypeScript
- Rust
async def run_contextless_oracle(oracle_client):
# Register known identities with starting nonces
identity_registry.clear()
identity_registry[identity_address_1.lower()] = 0
identity_registry[identity_address_2.lower()] = 0
# In production: load from database
# ... rest of oracle setup ...
async function runContextlessOracle(charlieClient: AlkahestClient) {
// Register known identities with starting nonces
identityRegistry.set(identityAddress1, 0);
identityRegistry.set(identityAddress2, 0);
// In production: load from database
// ... rest of oracle setup ...
}
async fn run_contextless_oracle(
charlie_client: AlkahestClient<BaseExtensions>
) -> eyre::Result<()> {
// Register known identities with starting nonces
{
let mut registry = identity_registry().lock().await;
registry.insert(identity_address_1, 0);
registry.insert(identity_address_2, 0);
// In production: load from database
}
// ... rest of oracle setup ...
}
Step 3: Implement the validation callback
The validation callback receives the fulfillment attestation and the demand data, then checks it against your registry:
- Python
- TypeScript
- Rust
def verify_identity_decision(attestation, demand) -> bool:
"""
Verify an identity fulfillment by checking:
1. The signature is valid
2. The nonce is greater than the last seen nonce
3. The recovered address matches the claimed pubkey
"""
try:
# Step 3a: Extract fulfillment data
obligation_str = oracle_client.oracle.extract_obligation_data(attestation)
payload = json.loads(obligation_str)
parsed = IdentityFulfillment(
pubkey=payload['pubkey'],
nonce=payload['nonce'],
data=payload['data'],
signature=payload['signature']
)
# Step 3b: Check against oracle's registry
pubkey_lower = parsed.pubkey.lower()
if pubkey_lower not in identity_registry:
return False # Unknown identity - not in our registry
# Step 3c: Verify nonce progression (replay protection)
current_nonce = identity_registry[pubkey_lower]
if parsed.nonce <= current_nonce:
return False
# Step 3d: Verify signature format
sig_len = len(parsed.signature) if parsed.signature else 0
if not parsed.signature or sig_len not in [130, 132]:
return False
# Step 3e: Verify cryptographic signature
message = f"{parsed.data}:{parsed.nonce}"
encoded_message = encode_defunct(text=message)
try:
recovered = Account.recover_message(encoded_message, signature=parsed.signature)
except Exception:
return False
if recovered.lower() != pubkey_lower:
return False # Signature mismatch
# Step 3f: Update state and approve
identity_registry[pubkey_lower] = parsed.nonce
return True
except Exception:
return False
async ({ attestation, demand }) => {
// Step 3a: Extract fulfillment data
const obligation = charlieClient.extractObligationData(
stringObligationAbi,
attestation,
);
const payload = obligation[0]?.item;
if (!payload) return false;
// Step 3b: Parse the fulfillment payload
let parsed: IdentityFulfillment;
try {
parsed = JSON.parse(payload) as IdentityFulfillment;
} catch {
return false;
}
// Step 3c: Check against oracle's registry
const currentNonce = identityRegistry.get(parsed.pubkey);
if (currentNonce === undefined) {
return false; // Unknown identity - not in our registry
}
// Step 3d: Verify nonce progression (replay protection)
if (parsed.nonce <= currentNonce) {
return false;
}
// Step 3e: Verify signature format
if (typeof parsed.signature !== "string" || parsed.signature.length !== 132) {
return false;
}
// Step 3f: Verify cryptographic signature
const message = `${parsed.data}:${parsed.nonce}`;
let recovered: `0x${string}`;
try {
recovered = await recoverMessageAddress({
message,
signature: parsed.signature,
});
} catch {
return false;
}
if (recovered.toLowerCase() !== parsed.pubkey.toLowerCase()) {
return false; // Signature mismatch
}
// Step 3g: Update state and approve
identityRegistry.set(parsed.pubkey, parsed.nonce);
return true;
}
fn verify_identity(
awd: &alkahest_rs::clients::oracle::AttestationWithDemand,
) -> Pin<Box<dyn Future<Output = Option<bool>> + Send>> {
let attestation = awd.attestation.clone();
Box::pin(async move {
// Step 3a: Extract fulfillment data
let obligation: StringObligation::ObligationData =
match StringObligation::ObligationData::abi_decode(&attestation.data) {
Ok(o) => o,
Err(_) => return Some(false),
};
// Step 3b: Parse the fulfillment payload
let payload = obligation.item.clone();
let parsed: IdentityFulfillment = match serde_json::from_str(&payload) {
Ok(p) => p,
Err(_) => return Some(false),
};
// Step 3c: Check against oracle's registry
let mut registry = identity_registry().lock().await;
let Some(current_nonce) = registry.get_mut(&parsed.pubkey) else {
return Some(false); // Unknown identity - not in our registry
};
// Step 3d: Verify nonce progression (replay protection)
if parsed.nonce <= *current_nonce {
return Some(false);
}
// Step 3e: Verify cryptographic signature
if parsed.signature.len() != 65 {
return Some(false);
}
let sig = match Signature::try_from(parsed.signature.as_slice()) {
Ok(s) => s,
Err(_) => return Some(false),
};
let message = format!("{}:{}", parsed.data, parsed.nonce);
let hash = keccak256(message.as_bytes());
let Ok(recovered) = sig.recover_address_from_prehash(&hash) else {
return Some(false);
};
if recovered != parsed.pubkey {
return Some(false); // Signature mismatch
};
// Step 3f: Update state and approve
*current_nonce = parsed.nonce;
Some(true)
})
}
Step 4: Set up listener and cleanup
Wire everything together with the listener:
- Python
- TypeScript
- Rust
from alkahest_py import ArbitrationMode
def callback(decision):
# Optional: called after arbitration is submitted on-chain
print(f"Arbitrated {decision.attestation.uid}: {decision.decision}")
# Listen and validate
decisions = await oracle_client.oracle.arbitrate_many(
verify_identity_decision,
callback,
ArbitrationMode.AllUnarbitrated,
timeout_seconds=10.0,
)
# Cleanup
identity_registry.clear()
const listener = await charlieClient.oracle.listenAndArbitrate(
// ... validation callback from Step 3 ...
{ skipAlreadyArbitrated: true },
);
// When shutting down the oracle
listener.unwatch();
identityRegistry.clear();
let listen_result = charlie_oracle
.arbitrate_many_async(
verify_identity,
|decision| async move {
// Optional: called after arbitration is submitted on-chain
println!("Arbitrated {}: {}", decision.attestation.uid, decision.decision);
},
ArbitrationMode::AllUnarbitrated,
)
.await?;
// Cleanup
charlie_oracle.unsubscribe(listen_result.subscription_id).await?;
Complete pattern:
- Define fulfillment format and registry state
- Initialize your oracle's registry/database
- Implement validation callback that:
- Extracts fulfillment data
- Checks against oracle's internal state
- Performs validation (e.g., signature checks)
- Updates state if needed
- Returns
trueorfalse
- Set up listener with callback and cleanup
Pattern 2: Demand-Based Validation
Demand-based oracles validate fulfillments against specific criteria provided by the buyer in the escrow demand. Each escrow can specify different requirements, and the oracle validates that Bob's fulfillment meets Alice's exact specifications.
When to use: Custom validation criteria per escrow, need to compare fulfillment against buyer's specifications, computational validation with test cases.
Why demand-based: Alice specifies exactly what she wants (e.g., "capitalize these specific test cases"), and the oracle verifies Bob's work matches those requirements. Different escrows have different demands, all validated by the same oracle.
Example flow:
- Alice creates escrow → demands
oracle=charlie,data="capitalize hello world", offers 100 tokens - Bob fulfills → submits
"HELLO WORLD", references Alice's escrow viarefUID - Bob requests arbitration → asks Charlie to validate
- Charlie validates → extracts Bob's result and Alice's query, checks if
"HELLO WORLD"matchesuppercase("hello world"), submits decision on-chain - Bob claims payment → uses approved attestation to collect escrow
Reference implementations:
- Python:
test_capitalization.py - TypeScript:
offchain-oracle-capitalization.test.ts - Rust:
offchain_oracle_capitalization.rs
Step 1: Define your demand format
Decide what parameters buyers provide in their escrow demands:
- Python
- TypeScript
- Rust
import json
import subprocess
from dataclasses import dataclass
from typing import List
@dataclass
class ShellTestCase:
input: str
output: str
@dataclass
class ShellOracleDemand:
description: str
test_cases: List[ShellTestCase]
import {
encodeAbiParameters,
hexToBytes,
parseAbiParameters,
stringToHex,
} from "viem";
import { exec as execCallback } from "node:child_process";
import { promisify } from "node:util";
const execAsync = promisify(execCallback);
type ShellTestCase = {
input: string;
output: string;
};
type ShellOracleDemand = {
description: string;
test_cases: ShellTestCase[];
};
// ABIs for decoding
const stringObligationAbi = parseAbiParameters("(string item)");
const shellDemandAbi = parseAbiParameters("(bytes payload)");
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TestCase {
input: String,
output: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CommandTestDemand {
description: String,
test_cases: Vec<TestCase>,
}
Buyers encode this as JSON in the TrustedOracleArbiter demand's data field.
Step 2: Implement validation with demand
The callback receives both the fulfillment attestation and the demand data directly. The demand is passed from the ArbitrationRequested event, so there's no need to fetch the escrow from chain.
- Python
- TypeScript
- Rust
async def decision_function(attestation, demand):
"""Evaluate whether the fulfillment meets the demand requirements"""
try:
# Step 2a: Extract fulfillment
statement = oracle_client.oracle.extract_obligation_data(attestation)
except Exception as e:
print(f"Failed to extract obligation: {e}")
return False
# Step 2b: Parse demand from callback argument
try:
demand_json = json.loads(bytes(demand).decode('utf-8'))
except Exception as e:
print(f"Failed to parse demand: {e}")
return False
# Step 2c: Apply validation logic using demand parameters
# Run each test case to verify Bob's submission works correctly
for case in demand_json['test_cases']:
command = f'echo "$INPUT" | {statement}'
try:
result = subprocess.run(
["bash", "-c", command],
env={"INPUT": case['input']},
capture_output=True,
text=True,
timeout=5
)
if result.returncode != 0:
return False
output = result.stdout.rstrip('\n')
if output != case['output']:
return False
except Exception:
return False
return True
async ({ attestation, demand }) => {
// Step 2a: Extract fulfillment
const obligation = charlieClient.extractObligationData(
stringObligationAbi,
attestation,
);
const statement = obligation[0]?.item;
if (!statement) return false;
// Step 2b: Parse demand from callback argument
const outerDemand = charlieClient.arbiters.general.trustedOracle.decodeDemand(demand);
const demandData = decodeAbiParameters(shellDemandAbi, outerDemand.data);
const payloadHex = demandData[0]?.payload;
if (!payloadHex) return false;
let payload: ShellOracleDemand;
try {
const payloadJson = new TextDecoder().decode(hexToBytes(payloadHex));
payload = JSON.parse(payloadJson) as ShellOracleDemand;
} catch {
return false;
}
// Step 2c: Apply validation logic using demand parameters
// Run each test case to verify Bob's submission works correctly
for (const testCase of payload.test_cases) {
const command = `echo "$INPUT" | ${statement}`;
try {
const { stdout } = await execAsync(command, {
env: {
...process.env,
INPUT: testCase.input,
},
shell: "/bin/bash",
});
if (stdout.trimEnd() !== testCase.output) {
return false;
}
} catch {
return false;
}
}
return true;
}
move |awd: &AttestationWithDemand| {
let attestation = awd.attestation.clone();
let demand_bytes = awd.demand.clone();
let client = charlie_client_arc.clone();
async move {
// Step 2a: Extract fulfillment
let Ok(fulfillment) = client
.extract_obligation_data::<StringObligation::ObligationData>(&attestation)
else {
return Some(false);
};
let submitted_command = fulfillment.item;
// Step 2b: Parse demand from callback argument
let Ok(test_demand) = serde_json::from_slice::<CommandTestDemand>(
demand_bytes.as_ref()
) else {
return Some(false);
};
// Step 2c: Apply validation logic using demand parameters
// Run each test case to verify Bob's submission works correctly
for case in test_demand.test_cases {
let full_command = format!("echo \"$INPUT\" | {}", submitted_command);
let output = match Command::new("bash")
.arg("-lc")
.arg(&full_command)
.env("INPUT", &case.input)
.output()
{
Ok(output) if output.status.success() => {
String::from_utf8_lossy(&output.stdout)
.trim_end()
.to_owned()
}
_ => return Some(false),
};
if output != case.output {
return Some(false);
}
}
Some(true)
}
}
Step 3: Understanding the data flow
ArbitrationRequested Event
└─ obligation: fulfillment UID
└─ oracle: charlie_address
└─ demand: bytes (your custom demand format)
│
▼
Callback receives (attestation, demand)
│
├─ attestation: Fulfillment Attestation
│ └─ data: StringObligation { item: "tr '[:lower:]' '[:upper:]'" }
│ └─ refUID: points to escrow
│
└─ demand: bytes (passed directly from event)
└─ Your custom format (e.g., JSON with test_cases)
The demand data is passed directly to your callback via the ArbitrationRequested event, so you don't need to fetch the escrow attestation from chain.
Step 4: Set up the listener
- Python
- TypeScript
- Rust
from alkahest_py import ArbitrationMode
def callback(decision):
"""Called when arbitration completes"""
print(f"Arbitrated {decision.attestation.uid}: {decision.decision}")
# Listen and arbitrate
decisions = await oracle_client.oracle.arbitrate_many(
decision_function,
callback,
ArbitrationMode.AllUnarbitrated,
timeout_seconds=10.0,
)
# Verify decisions
assert all(d.decision for d in decisions), "Oracle rejected fulfillment"
const listener = await charlieClient.arbiters.general.trustedOracle.arbitrateMany(
// ... validation callback from Step 2 ...
{ mode: "allUnarbitrated" },
);
// When shutting down
listener.unwatch();
let listen_result = charlie_oracle
.arbitrate_many_async(
// ... validation callback from Step 2 ...
|decision| async move {
println!("Arbitrated {}: {}", decision.attestation.uid, decision.decision);
},
&ArbitrationMode::AllUnarbitrated,
)
.await?;
charlie_oracle.unsubscribe(listen_result.subscription_id).await?;
Complete pattern:
- Define demand format (your oracle's API)
- Implement validation callback:
- Extract fulfillment data from the attestation
- Parse demand data from the callback argument
- Apply validation logic comparing fulfillment to demand
- Return
trueorfalse
- Set up listener with callback
Pattern 3: Asynchronous Validation
Asynchronous oracles handle validation that cannot complete immediately. They require monitoring over time, accumulating data, or waiting for external conditions. The oracle schedules work for later, and a background worker submits the decision when ready.
When to use: Time-based monitoring (uptime checks, deadline validation), accumulating evidence over multiple observations, long-running computations, waiting for consensus from multiple sources.
Why asynchronous: Some validation is inherently time-based. For example, "verify this server stays up for 24 hours" cannot be validated instantly - you must schedule checks over time and make a final decision later.
Architecture:
Listener (receives requests) → Job Queue (stores pending work)
↓
Worker (processes jobs over time)
↓
On-chain Submission (final decision)
Reference implementations:
- Python:
test_uptime.py - TypeScript:
offchain-oracle-uptime.test.ts - Rust:
offchain_oracle_uptime.rs
Step 1: Define demand format and job state
Define both the demand buyers will provide and the internal state for tracking scheduled work:
- Python
- TypeScript
- Rust
import asyncio
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional
@dataclass
class UptimeDemand:
service_url: str
min_uptime: float # Required uptime percentage (0.0-1.0)
start: int # Start time for monitoring window
end: int # End time for monitoring window
check_interval_secs: int
@dataclass
class PingEvent:
delay_secs: float
success: bool
@dataclass
class UptimeJob:
min_uptime: float
schedule: List[PingEvent]
demand: bytes
# Shared state between listener and worker
@dataclass
class SchedulerContext:
job_db: Dict[str, UptimeJob] = field(default_factory=dict)
url_index: Dict[str, str] = field(default_factory=dict)
notify: Optional[asyncio.Event] = field(default_factory=asyncio.Event)
import { parseAbiParameters, stringToHex, encodeAbiParameters, hexToBytes } from "viem";
// Demand format (buyers provide this)
type UptimeDemand = {
service_url: string;
min_uptime: number; // Required uptime percentage (0.0-1.0)
start: number; // Start time for monitoring window
end: number; // End time for monitoring window
check_interval_secs: number;
};
// Internal job state (your oracle tracks this)
type PingEvent = {
delayMs: number;
success: boolean;
};
type UptimeJob = {
minUptime: number;
schedule: PingEvent[];
demandData: `0x${string}`;
};
// Shared state between listener and worker
type SchedulerContext = {
jobDb: Map<`0x${string}`, UptimeJob>;
urlIndex: Map<string, `0x${string}`>;
waiters: Array<() => void>;
};
let schedulerContext: SchedulerContext | undefined;
// ABIs for decoding
const stringObligationAbi = parseAbiParameters("(string item)");
const uptimeDemandAbi = parseAbiParameters("(bytes payload)");
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use tokio::sync::{Mutex, Notify};
// Demand format (buyers provide this)
#[derive(Debug, Clone, Serialize, Deserialize)]
struct UptimeDemand {
service_url: String,
min_uptime: f64, // Required uptime percentage (0.0-1.0)
start: u64, // Start time for monitoring window
end: u64, // End time for monitoring window
check_interval_secs: u64,
}
// Internal job state (your oracle tracks this)
#[derive(Debug, Clone)]
struct PingEvent {
delay: StdDuration,
success: bool,
}
#[derive(Debug, Clone)]
struct UptimeJob {
min_uptime: f64,
schedule: Vec<PingEvent>,
demand: Bytes,
}
// Shared state between listener and worker
type JobDb = Arc<Mutex<HashMap<FixedBytes<32>, UptimeJob>>>;
#[derive(Clone)]
struct SchedulerContext {
job_db: JobDb,
notify: Arc<Notify>,
url_index: Arc<Mutex<HashMap<String, FixedBytes<32>>>>,
client: Arc<AlkahestClient<BaseExtensions>>,
}
static SCHEDULER_STATE: OnceLock<Mutex<Option<SchedulerContext>>> = OnceLock::new();
Step 2: Implement the scheduling callback
The listener callback schedules work but does not make a decision — it returns None (null) to defer the decision to the worker:
- Python
- TypeScript
- Rust
async def schedule_decision(attestation, demand):
"""Schedule uptime monitoring but DON'T make a decision yet."""
try:
# Step 2a: Extract service URL from fulfillment
statement = oracle_client.oracle.extract_obligation_data(attestation)
# Look up the fulfillment UID from our URL index
uid = ctx.url_index.get(statement)
if uid is None or uid in ctx.job_db:
return None
# Step 2b: Parse demand from callback argument
demand_json = json.loads(bytes(demand).decode("utf-8"))
# Verify URL matches
if statement != demand_json["service_url"]:
return None
# Step 2c: Create monitoring schedule
total_span = max(demand_json["end"] - demand_json["start"], 1)
interval = max(demand_json["check_interval_secs"], 1)
checks = max(total_span // interval, 1)
schedule = []
for i in range(checks):
schedule.append(PingEvent(
delay_secs=0.1 + i * 0.025,
success=True, # Will be determined by actual ping
))
# Step 2d: Store job for background processing
ctx.job_db[uid] = UptimeJob(
min_uptime=demand_json["min_uptime"],
schedule=schedule,
demand=bytes(demand),
)
ctx.notify.set() # Wake up worker
# Step 2e: Return None to defer decision to the worker
return None
except Exception:
return None
async ({ attestation, demand }) => {
const ctx = getScheduler();
if (!ctx) return null;
// Step 2a: Extract service URL from fulfillment
const obligation = charlieClient.extractObligationData(
stringObligationAbi,
attestation,
);
const statement = obligation[0];
if (!statement?.item) return null;
const fulfillmentUid = ctx.urlIndex.get(statement.item);
if (!fulfillmentUid || ctx.jobDb.has(fulfillmentUid)) return null;
// Step 2b: Parse demand from callback argument
const outerDemand = charlieClient.arbiters.general.trustedOracle.decodeDemand(demand);
const demandData = decodeAbiParameters(uptimeDemandAbi, outerDemand.data);
const payloadHex = demandData[0]?.payload;
if (!payloadHex) return null;
let parsed: UptimeDemand;
try {
const json = new TextDecoder().decode(hexToBytes(payloadHex));
parsed = JSON.parse(json) as UptimeDemand;
} catch {
return null;
}
// Step 2c: Create monitoring schedule
const totalSpan = Math.max(parsed.end - parsed.start, 1);
const interval = Math.max(parsed.check_interval_secs, 1);
const checks = Math.max(Math.floor(totalSpan / interval), 1);
const schedule: PingEvent[] = [];
for (let i = 0; i < checks; i++) {
schedule.push({ delayMs: 100 + i * 25, success: true });
}
// Step 2d: Store job for background processing
const rawDemandBytes = encodeAbiParameters(uptimeDemandAbi, demandData) as `0x${string}`;
ctx.jobDb.set(fulfillmentUid, {
minUptime: parsed.min_uptime,
schedule,
demandData: rawDemandBytes,
});
notifyScheduler(ctx); // Wake up worker
// Step 2e: Return null to defer decision to the worker
return null;
}
fn schedule_pings(
awd: &alkahest_rs::clients::oracle::AttestationWithDemand,
) -> Pin<Box<dyn Future<Output = Option<bool>> + Send>> {
let attestation = awd.attestation.clone();
let demand_bytes = awd.demand.clone();
Box::pin(async move {
let ctx_opt = scheduler_state().lock().await.clone();
let Some(ctx) = ctx_opt else {
return None;
};
// Step 2a: Extract service URL from fulfillment
let Ok(statement) = ctx.client
.extract_obligation_data::<StringObligation::ObligationData>(&attestation)
else {
return None;
};
let url = statement.item.clone();
let Some(uid) = ctx.url_index.lock().await.get(&url).cloned() else {
return None;
};
// Step 2b: Parse demand from callback argument
let Ok(parsed_demand) = serde_json::from_slice::<UptimeDemand>(
demand_bytes.as_ref()
) else {
return None;
};
// Step 2c: Create monitoring schedule
let total_span = parsed_demand.end.saturating_sub(parsed_demand.start).max(1);
let interval = parsed_demand.check_interval_secs.max(1);
let checks = (total_span / interval).max(1) as usize;
let mut schedule = Vec::with_capacity(checks);
for i in 0..checks {
schedule.push(PingEvent {
delay: StdDuration::from_millis(100 + (i * 25) as u64),
success: true, // Will be determined by actual ping
});
}
// Step 2d: Store job for background processing
ctx.job_db.lock().await.entry(uid).or_insert(UptimeJob {
min_uptime: parsed_demand.min_uptime,
schedule,
demand: demand_bytes,
});
ctx.notify.notify_one(); // Wake up worker
// Step 2e: Return None to defer decision to the worker
None
})
}
Step 3: Implement the background worker
Create a worker that processes scheduled jobs and submits decisions:
- Python
- TypeScript
- Rust
async def run_worker(ctx: SchedulerContext, oracle_client) -> None:
"""Background worker that processes uptime jobs and submits decisions."""
while True:
# Step 3a: Get next job from queue
uid = None
job = None
for k, v in list(ctx.job_db.items()):
uid = k
job = v
del ctx.job_db[k]
break
if uid is not None and job is not None:
# Step 3b: Execute the monitoring schedule
successes = 0
total_checks = max(len(job.schedule), 1)
for ping in job.schedule:
await asyncio.sleep(ping.delay_secs)
# In production: actually ping the service
if ping.success:
successes += 1
# Step 3c: Calculate result and make decision
uptime = successes / total_checks
decision = uptime >= job.min_uptime
# Step 3d: Submit decision on-chain
# CRITICAL: Unlike synchronous oracles where arbitrate_many
# handles submission automatically, async oracles must MANUALLY
# call oracle.arbitrate() to submit their decision
await oracle_client.oracle.arbitrate(uid, list(job.demand), decision)
else:
# Step 3e: Wait for new work
ctx.notify.clear()
try:
await asyncio.wait_for(ctx.notify.wait(), timeout=2.0)
except asyncio.TimeoutError:
if not ctx.job_db:
break # No work and no notifications - exit
function startSchedulerWorker(
ctx: SchedulerContext,
arbiters: typeof charlieClient.arbiters,
) {
let active = true;
const promise = (async () => {
while (active) {
// Step 3a: Get next job from queue
let entry = dequeueJob(ctx);
if (!entry) {
const hasJob = await waitForJob(ctx, 2000);
if (!hasJob) {
if (ctx.jobDb.size === 0) break;
continue;
}
entry = dequeueJob(ctx);
if (!entry) continue;
}
const { uid, job } = entry;
// Step 3b: Execute the monitoring schedule
let successes = 0;
const totalChecks = Math.max(job.schedule.length, 1);
for (const ping of job.schedule) {
await Bun.sleep(ping.delayMs);
// In production: actually ping the service
if (ping.success) successes += 1;
}
// Step 3c: Calculate result and make decision
const uptime = successes / totalChecks;
const decision = uptime >= job.minUptime;
// Step 3d: Submit decision on-chain
// CRITICAL: Unlike synchronous oracles where listenAndArbitrate
// handles submission automatically, async oracles must MANUALLY
// call arbitrate() to submit their decision
await arbiters.general.trustedOracle.arbitrate(uid, job.demandData, decision);
}
})();
return {
stop: () => {
active = false;
notifyScheduler(ctx);
},
promise,
};
}
async fn run_worker(
job_db: JobDb,
notify: Arc<Notify>,
arbiters: ArbitersModule,
) {
loop {
// Step 3a: Get next job from queue
let maybe_job = {
let mut db = job_db.lock().await;
if let Some((&uid, job)) = db.iter().next() {
let job = job.clone();
db.remove(&uid);
Some((uid, job))
} else {
None
}
};
if let Some((uid, job)) = maybe_job {
// Step 3b: Execute the monitoring schedule
let mut successes = 0usize;
let total_checks = job.schedule.len().max(1);
for ping in job.schedule {
tokio::time::sleep(ping.delay).await;
// In production: actually ping the service
if ping.success {
successes += 1;
}
}
// Step 3c: Calculate result and make decision
let uptime = successes as f64 / total_checks as f64;
let decision = uptime >= job.min_uptime;
// Step 3d: Submit decision on-chain
// CRITICAL: Unlike synchronous oracles where arbitrate_many_async
// handles submission automatically, async oracles must MANUALLY
// call arbitrate() to submit their decision
arbiters
.trusted_oracle()
.arbitrate(uid, job.demand.clone(), decision)
.await
.expect("oracle arbitration tx should succeed");
} else {
// Step 3e: Wait for new work
match tokio::time::timeout(StdDuration::from_secs(2), notify.notified()).await {
Ok(_) => continue,
Err(_) => {
if job_db.lock().await.is_empty() {
break; // No work and no notifications - exit
}
}
}
}
}
}
Step 4: Wire everything together
Set up the listener, worker, and shared state:
- Python
- TypeScript
- Rust
from alkahest_py import ArbitrationMode
# Step 4a: Initialize shared state
ctx = SchedulerContext()
ctx.url_index[service_url] = fulfillment_uid
# Step 4b: Start background worker
worker_task = asyncio.create_task(run_worker(ctx, oracle_client))
# Step 4c: Start listener (scheduling callback from Step 2)
decisions = await oracle_client.oracle.arbitrate_many(
schedule_decision,
callback,
ArbitrationMode.AllUnarbitrated,
timeout_seconds=10.0,
)
# Step 4d: Wait for worker and cleanup
await asyncio.wait_for(worker_task, timeout=10.0)
// Step 4a: Initialize shared state
const scheduler: SchedulerContext = {
jobDb: new Map(),
urlIndex: new Map(),
waiters: [],
};
scheduler.urlIndex.set(serviceUrl, fulfillmentUid);
setScheduler(scheduler);
// Step 4b: Start background worker
const worker = startSchedulerWorker(scheduler, charlieClient.arbiters);
// Step 4c: Start listener (scheduling callback from Step 2)
const listener = await charlieClient.arbiters.general.trustedOracle.arbitrateMany(
// ... scheduling callback ...
{ mode: "allUnarbitrated" },
);
// Step 4d: Wait and cleanup
listener.unwatch();
worker.stop();
await worker.promise;
setScheduler(undefined);
// Step 4a: Initialize shared state
let job_db: JobDb = Arc::new(Mutex::new(HashMap::new()));
let scheduler_notify = Arc::new(Notify::new());
{
let mut slot = scheduler_state().lock().await;
*slot = Some(SchedulerContext {
job_db: Arc::clone(&job_db),
notify: Arc::clone(&scheduler_notify),
url_index: Arc::clone(&url_index),
client: Arc::new(charlie_client.clone()),
});
}
// Step 4b: Start background worker
let worker = tokio::spawn(async move {
run_worker(worker_db, worker_notify, worker_arbiters).await;
});
// Step 4c: Start listener (scheduling callback from Step 2)
let listen_result = charlie_oracle
.arbitrate_many_async(
schedule_pings,
|_| async {},
ArbitrationMode::AllUnarbitrated,
)
.await?;
// Step 4d: Wait and cleanup
worker.await.unwrap();
charlie_oracle.unsubscribe(listen_result.subscription_id).await?;
{
let mut slot = scheduler_state().lock().await;
*slot = None;
}
Complete asynchronous oracle pattern:
- Define demand format and job state structures
- Implement scheduling callback that:
- Extracts fulfillment data
- Parses demand from callback argument
- Creates job schedule
- Stores job in shared queue
- Returns
None/null(defers decision)
- Implement background worker that:
- Polls job queue
- Executes scheduled work
- Makes decision based on results
- Calls
arbitrate()to submit decision on-chain
- Wire together with shared state and cleanup
Key differences from synchronous patterns:
| Aspect | Synchronous (Patterns 1 & 2) | Asynchronous (Pattern 3) |
|---|---|---|
| Callback return value | true or false | None/null (defers decision) |
| Decision submission | Automatic via SDK | Manual via arbitrate() |
| Architecture | Single callback function | Callback + background worker |
| State management | Optional (Pattern 1 only) | Required (job queue) |
| Timing | Instant validation | Delayed validation over time |
Choosing the Right Pattern
Quick decision tree:
- Does validation require waiting/monitoring over time?
- YES → Pattern 3: Asynchronous
- Does validation need the escrow's demand parameters?
- YES → Pattern 2: Demand-Based
- NO → Pattern 1: Contextless
Detailed comparison:
| Validation Type | Pattern | Complexity | Example |
|---|---|---|---|
| Signature verification | Contextless | Low | Verify identity attestations |
| Format/standard checking | Contextless | Low | Validate JSON schemas |
| Test case validation | Demand-Based | Medium | Run buyer-specified tests |
| Computational verification | Demand-Based | Medium | Check algorithmic solutions |
| Uptime monitoring | Asynchronous | High | Verify 99% uptime over 24h |
| Consensus voting | Asynchronous | High | Wait for multiple approvals |
Production Considerations
Error Handling and Logging
Always handle errors gracefully and log for debugging:
- Python
- TypeScript
- Rust
import logging
logger = logging.getLogger(__name__)
# In validation callback
try:
data = parse_data(attestation)
except Exception as e:
logger.error(f"Failed to parse attestation {attestation.uid}: {e}")
return False # Reject invalid data
# Log decisions
logger.info(
f"Oracle decision for {attestation.uid}: "
f"{'approved' if decision else 'rejected'}"
)
// In validation callback
try {
const data = await parseData(attestation);
} catch (error) {
console.error("Failed to parse attestation:", attestation.uid, error);
return false; // Reject invalid data
}
// Log decisions
console.log(
`Oracle decision for ${attestation.uid}: ${decision ? "approved" : "rejected"}`
);
use tracing::{info, warn, error};
// In validation callback
let Ok(data) = parse_data(&attestation) else {
error!("Failed to parse attestation: {:?}", attestation.uid);
return Some(false); // Reject invalid data
};
// Log decisions
info!(
uid = ?attestation.uid,
decision = decision,
"Oracle decision: {}",
if decision { "approved" } else { "rejected" }
);
State Persistence
For async oracles, persist state to survive restarts:
- Python
- TypeScript
- Rust
import sqlite3
import json
class PersistentJobDb:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
uid TEXT PRIMARY KEY,
job TEXT NOT NULL
)
""")
self.conn.commit()
def insert_job(self, uid: str, job: UptimeJob):
job_json = json.dumps({
'fulfillment_uid': job.fulfillment_uid,
'service_url': job.service_url,
'min_uptime': job.min_uptime,
'checks_remaining': job.checks_remaining,
'successes': job.successes,
'total_checks': job.total_checks,
})
self.conn.execute(
"INSERT OR REPLACE INTO jobs (uid, job) VALUES (?, ?)",
(uid, job_json)
)
self.conn.commit()
def get_next_job(self):
cursor = self.conn.execute("SELECT uid, job FROM jobs LIMIT 1")
row = cursor.fetchone()
if not row:
return None
uid, job_json = row
job_data = json.loads(job_json)
job = UptimeJob(**job_data)
self.conn.execute("DELETE FROM jobs WHERE uid = ?", (uid,))
self.conn.commit()
return (uid, job)
import Database from "better-sqlite3";
class PersistentJobDb {
private db: Database.Database;
constructor(filepath: string) {
this.db = new Database(filepath);
this.db.exec(`
CREATE TABLE IF NOT EXISTS jobs (
uid TEXT PRIMARY KEY,
job TEXT NOT NULL
)
`);
}
insertJob(uid: `0x${string}`, job: UptimeJob) {
const stmt = this.db.prepare("INSERT OR REPLACE INTO jobs (uid, job) VALUES (?, ?)");
stmt.run(uid, JSON.stringify(job));
}
getNextJob(): { uid: `0x${string}`; job: UptimeJob } | undefined {
const row = this.db.prepare("SELECT uid, job FROM jobs LIMIT 1").get() as
| { uid: string; job: string }
| undefined;
if (!row) return undefined;
this.db.prepare("DELETE FROM jobs WHERE uid = ?").run(row.uid);
return {
uid: row.uid as `0x${string}`,
job: JSON.parse(row.job) as UptimeJob,
};
}
}
// Use sled, rocksdb, or postgres for production
use sled::Db;
struct PersistentJobDb {
db: Db,
}
impl PersistentJobDb {
fn insert_job(&self, uid: &FixedBytes<32>, job: &UptimeJob) -> eyre::Result<()> {
let value = bincode::serialize(job)?;
self.db.insert(uid.as_slice(), value)?;
Ok(())
}
fn get_next_job(&self) -> eyre::Result<Option<(FixedBytes<32>, UptimeJob)>> {
if let Some(Ok((key, value))) = self.db.iter().next() {
let uid = FixedBytes::<32>::from_slice(&key);
let job: UptimeJob = bincode::deserialize(&value)?;
self.db.remove(&key)?;
Ok(Some((uid, job)))
} else {
Ok(None)
}
}
}
Retry Logic
Handle transaction failures gracefully:
- Python
- TypeScript
- Rust
import asyncio
async def submit_with_retry(
oracle_client,
uid: str,
demand: bytes,
decision: bool,
max_attempts: int = 3
):
for attempt in range(max_attempts):
try:
await oracle_client.oracle.arbitrate(uid, list(demand), decision)
return
except Exception as e:
logger.warning(f"Arbitration attempt {attempt + 1} failed: {e}")
if attempt < max_attempts - 1:
await asyncio.sleep(2 ** attempt)
raise Exception(f"Failed to submit arbitration after {max_attempts} attempts")
async function submitWithRetry(
arbiters: typeof charlieClient.arbiters,
uid: `0x${string}`,
demandData: `0x${string}`,
decision: boolean,
maxAttempts = 3,
) {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
await arbiters.general.trustedOracle.arbitrate(uid, demandData, decision);
return;
} catch (error) {
console.warn(`Arbitration attempt ${attempt + 1} failed:`, error);
if (attempt < maxAttempts - 1) {
await Bun.sleep(2 ** attempt * 1000);
}
}
}
throw new Error(`Failed to submit arbitration after ${maxAttempts} attempts`);
}
async fn submit_with_retry(
arbiters: &ArbitersModule,
uid: FixedBytes<32>,
demand: Bytes,
decision: bool,
) -> eyre::Result<()> {
for attempt in 0..3 {
match arbiters.trusted_oracle().arbitrate(uid, demand.clone(), decision).await {
Ok(_) => return Ok(()),
Err(e) => {
warn!("Arbitration attempt {} failed: {}", attempt + 1, e);
tokio::time::sleep(Duration::from_secs(2u64.pow(attempt))).await;
}
}
}
Err(eyre!("Failed to submit arbitration after 3 attempts"))
}
Reference Implementations
See the full working examples in the test suites:
-
Pattern 1 (Contextless): Identity verification with signature validation
- Python:
test_identity.py - TypeScript:
offchain-oracle-identity.test.ts - Rust:
offchain_oracle_identity.rs
- Python:
-
Pattern 2 (Demand-Based): Test case validation for shell commands
- Python:
test_capitalization.py - TypeScript:
offchain-oracle-capitalization.test.ts - Rust:
offchain_oracle_capitalization.rs
- Python:
-
Pattern 3 (Asynchronous): Uptime monitoring with background worker
- Python:
test_uptime.py - TypeScript:
offchain-oracle-uptime.test.ts - Rust:
offchain_oracle_uptime.rs
- Python:
These tests demonstrate the complete flow including escrow creation (Alice), fulfillment submission (Bob), oracle validation (Charlie), and payment collection.